#include <time.h>
#include <thread>
#include "msgdecoder.h"
#include "messageutil.h"
#include "command_config.h"
#include "../inc/logger.h"

MsgDecoder :: MsgDecoder(int queueSize,bool isServer)
{
	if(queueSize<1)
	    queueSize=64;
	mSequenceQueue = new SafeQueue(queueSize);
	mNormalQueue = new SafeQueue(queueSize);
	mPriorityQueue = new SafeQueue(queueSize);
	//mHeartbeatQueue = new SafeQueue(1);
	heartbeat=false;
	this->isServer=isServer;
#ifdef DEBUG_CODE
	totalCount=0;
#endif
}

MsgDecoder :: ~MsgDecoder()
{
	//LOG4_WARN("SyncQueue size:%d ,AsyncQueue size:%d",getSyncQueue()->getLength(),getAsyncQueue()->getLength());
	for (; NULL != mSequenceQueue->top();) {
		delete((Package*)mSequenceQueue->pop());
	}
	delete mSequenceQueue;
	mSequenceQueue = NULL;

	for (; NULL != mNormalQueue->top();) {
		delete((Package*)mNormalQueue->pop());
	}
	delete mNormalQueue;
	mNormalQueue = NULL;

	for (; NULL != mPriorityQueue->top();) {
		delete((Package*)mPriorityQueue->pop());
	}
	delete mPriorityQueue;
	mPriorityQueue = NULL;

	/*for (; NULL != mHeartbeatQueue->top();) {
		delete((Package*)mHeartbeatQueue->pop());
	}
	delete mHeartbeatQueue;
	mHeartbeatQueue = NULL;*/
}

Package* MsgDecoder::decode(struct evbuffer *evbuf)
{
	
#ifdef _WIN32
#define MAXTIME  10
#else
#define MAXTIME  1
#endif
	int count = 0;
	bool hasOutput = false;
	Package* package = NULL;
	for (;;) {
		size_t total = evbuffer_get_length(evbuf);
		size_t size2=evbuffer_get_contiguous_space(evbuf);
		if (total<sizeof(PackageHeader)) break;
		char headerbuf[sizeof(PackageHeader)];
		evbuffer_copyout(evbuf, headerbuf, sizeof(PackageHeader));
		if (headerbuf[0] != PackageFlag || headerbuf[1] != PackageFlag)
		{
			if (headerbuf[1] != PackageFlag)
				evbuffer_drain(evbuf,2);
			else
				evbuffer_drain(evbuf, 1);
			if (!hasOutput)
			{
				LOG4_ERROR("decode error,forgot set buffer size?,see bellow command.");
				/*sysctl - w net.core.wmem_default = 1024000
				sysctl - w net.core.wmem_max = 1024000
				sysctl - w net.core.rmem_default = 1024000
				sysctl - w net.core.rmem_max = 1024000*/
			}
			hasOutput = true;
			continue;
		}
		if (!PackageHeader::CheckCrc(headerbuf))
		{
			evbuffer_drain(evbuf, 1);
			LOG4_ERROR("crc error in package header.");
			continue;
		}
		PackageHeader header;
		header.read((const char*)headerbuf);
		if (header.offset()>sizeof(PackageHeader))
		{
			evbuffer_drain(evbuf, sizeof(PackageHeader));
			LOG4_ERROR("offset in header error,offset=%d,%s", header.offset(), header.DebugString().c_str());
			break;
		}
		if (header.bodysize()>10000000)//10M
		{
			evbuffer_drain(evbuf, 1);
			LOG4_ERROR("decode error,too big bodysize,header:%s", header.DebugString().c_str());
			continue;
		}
		if (header.type() != PackageHeader::Request && header.type() != PackageHeader::Response && header.type() != PackageHeader::Publish)
		{
			evbuffer_drain(evbuf, 1);
			LOG4_ERROR("decode error,error type,header:%s", header.DebugString().c_str());
			continue;
		}
		
		if (header.bodysize() + header.offset() > total)
			break;
		
		if (header.bodysize()>200000)
			LOG4_INFO("Finished receive Big msg,body size=%d,header:%s", header.bodysize(), header.DebugString().c_str());
		int readSize = header.bodysize() + header.offset();
		unsigned char* data = evbuffer_pullup(evbuf, readSize);
		if (!data)
		{
			LOG4_ERROR("out of memory,free mem %d",total);
			evbuffer_drain(evbuf, total);
			break;
		}
		std::string type;
			type = CommandConfig::getInstance().GetClass(header.command());
		package = new Package(header, type, (const char*)data, header.bodysize() + header.offset(), MessagePtr());
		evbuffer_drain(evbuf, readSize);
		hasOutput = false;
		count++;
		break;
	}

	return package;
}

int MsgDecoder::decode2(struct evbuffer * evbuf)
{
#ifdef _WIN32
    #define MAXTIME  10
#else
	#define MAXTIME  1
#endif
	int ret = 0;
	int count=0;
	bool hasOutput=false;
	for( ; ; ) {
		size_t size = evbuffer_get_length(evbuf);
		if (size<sizeof(PackageHeader)) break;
		unsigned char* data = evbuffer_pullup(evbuf, sizeof(PackageHeader));
		if(data[0]!=PackageFlag || data[1]!=PackageFlag)
		{
			if(data[1]!=PackageFlag)
				evbuffer_drain(evbuf, 2);
			else
				evbuffer_drain(evbuf, 1);
			if (!hasOutput)
			{
				LOG4_ERROR("decode error,forgot set buffer size?,see bellow command.");
				/*sysctl - w net.core.wmem_default = 1024000
					sysctl - w net.core.wmem_max = 1024000
					sysctl - w net.core.rmem_default = 1024000
					sysctl - w net.core.rmem_max = 1024000*/
			}
			hasOutput=true;
			continue;
		}
		if (!PackageHeader::CheckCrc((const char*)data))
		{
			evbuffer_drain(evbuf, 1);
			LOG4_ERROR("crc error in package header.");
			continue;
		}
		PackageHeader header;
		header.read((const char*)data);
		if (header.offset()>sizeof(PackageHeader))
		{
			evbuffer_drain(evbuf, sizeof(PackageHeader));
			LOG4_ERROR("offset in header error,offset=%d,%s", header.offset(), header.DebugString().c_str());
			break;
		}
		if (header.bodysize()>10000000)
		{
			evbuffer_drain(evbuf, 1);
			LOG4_ERROR("decode error,too big bodysize,header:%s",header.DebugString().c_str());
			continue;
		}
		if (header.type() != PackageHeader::Request && header.type() != PackageHeader::Response && header.type() != PackageHeader::Publish)
		{
			evbuffer_drain(evbuf, 1);
			LOG4_ERROR("decode error,error type,header:%s",header.DebugString().c_str());
			continue;
		}
		if (header.bodysize() + header.offset()>size)
			break;

		if (header.bodysize()>200000)
			LOG4_INFO("Finished receive Big msg,body size=%d,header:%s", header.bodysize(), header.DebugString().c_str());
		int readSize = header.bodysize() + header.offset();
		data = evbuffer_pullup(evbuf, readSize);
		if (!data)
		{
			LOG4_ERROR("out of memory,free mem %d", size);
			evbuffer_drain(evbuf, size);
			break;
		}
		std::string type;
			type = CommandConfig::getInstance().GetClass(header.command());
		Package* package = new Package(header, type, (const char*)data, header.bodysize() + header.offset(), MessagePtr());
		evbuffer_drain(evbuf, readSize);
		if(type==MsgExpress::HeartBeatResponse::default_instance().GetTypeName())
		{
			mPriorityQueue->push(package);
			heartbeat=true;
			//LOG4_INFO("Receive heartbeat.");
		}
		else if(type==MsgExpress::RestartApp::default_instance().GetTypeName() && !isServer)
		{
			LOG4_WARN("I will restart.");
			std::this_thread::sleep_for(std::chrono::milliseconds(1000));
			exit(0);
		}
		else if(header.issequence() || header.ismultipage())
		{
		    LOG4_DEBUG("Decode a sequence msg:%s",package->DebugString().c_str());
			mSequenceQueue->push(package);
			int size = mSequenceQueue->getLength();
			if((size>0 && size%1000==0) || (size>1000 && size%100==0))
				LOG4_WARN("Sequence queue size:%d",size);
		}
		else if (header.ispriority())
		{
			LOG4_DEBUG("Decode a priority msg:%s", package->DebugString().c_str());
			mPriorityQueue->push(package);
		}
		else
		{
		    LOG4_DEBUG("Decode a none sequence msg:%s",package->DebugString().c_str());
			mNormalQueue->push(package);
			int size = mNormalQueue->getLength();
			if((size>0 && size%1000==0) || (size>1000 && size%100==0))
				LOG4_WARN("Queue size:%d",size);
		}
		hasOutput=false;
		count++;
	}

	return ret;
}

SafeQueue * MsgDecoder :: getSequenceQueue()
{
	return mSequenceQueue;
}

SafeQueue * MsgDecoder :: getNormalQueue()
{
	return mNormalQueue;
}

SafeQueue * MsgDecoder::getPriorityQueue()
{
	return mPriorityQueue;
}
//
//SafeQueue * MsgDecoder :: getHeartbeatQueue()
//{
//	return mHeartbeatQueue;
//}

bool MsgDecoder :: hasHeartbeat()
{
	bool ret=heartbeat;
	heartbeat=false;
	return ret;
}
